In [1]:
import sys
if not '..' in sys.path:
sys.path.append('..')
from draw_workflow import draw_workflow
In [2]:
from noodles import schedule, run, run_parallel, gather
In [3]:
@schedule
def add(a, b):
return a+b
@schedule
def sub(a, b):
return a-b
@schedule
def mul(a, b):
return a*b
Our fledgeling Python script kiddie then enters the following code
In [4]:
u = add(5, 4)
v = sub(u, 3)
w = sub(u, 2)
x = mul(v, w)
In [5]:
draw_workflow('callgraph1.png', x._workflow)
resulting in this workflow:
We may run this in parallel!
In [6]:
run_parallel(x, n_threads = 2)
Out[6]:
In [7]:
def schedule(f):
@wraps(f)
def wrapped(*args, **kwargs):
bound_args = signature(f).bind(*args, **kwargs)
bound_args.apply_defaults()
return PromisedObject(merge_workflow(f, bound_args))
return wrapped
In [8]:
class PromisedObject:
def __init__(self, workflow):
self._workflow = workflow
def __call__(self, *args, **kwargs):
return _do_call(self._workflow, *args, **kwargs)
def __getattr__(self, attr):
if attr[0] == '_':
return self.__dict__[attr]
return _getattr(self._workflow, attr)
def __setattr__(self, attr, value):
if attr[0] == '_':
self.__dict__[attr] = value
return
self._workflow = get_workflow(_setattr(self._workflow, attr, value))
In [9]:
def merge_workflow(f, bound_args):
variadic = next((x.name for x in bound_args.signature.parameters.values()
if x.kind == Parameter.VAR_POSITIONAL), None)
if variadic:
bound_args.arguments[variadic] = list(bound_args.arguments[variadic])
node = FunctionNode(f, bound_args)
idx = id(node)
nodes = {idx: node}
links = {idx: set()}
for address in serialize_arguments(bound_args):
workflow = get_workflow(
ref_argument(bound_args, address))
if not workflow:
continue
set_argument(bound_args, address, Parameter.empty)
for n in workflow.nodes:
if n not in nodes:
nodes[n] = workflow.nodes[n]
links[n] = set()
links[n].update(workflow.links[n])
links[workflow.top].add((idx, address))
return Workflow(id(node), nodes, links)
In [10]:
from noodles import schedule, run, run_parallel, gather
In [11]:
@schedule
def sum(a, buildin_sum = sum):
return buildin_sum(a)
In [12]:
r1 = add(1, 1)
r2 = sub(3, r1)
def foo(a, b, c):
return mul(add(a, b), c)
multiples = [foo(i, r2, r1) for i in range(6)]
r5 = sum(gather(*multiples))
In [13]:
draw_workflow('callgraph2.png', r5._workflow)
In [14]:
run_parallel(r5, n_threads = 4)
Out[14]:
In [15]:
@schedule
def sqr(a):
return a*a
@schedule
def map(f, lst):
return gather(*[f(x) for x in lst])
@schedule
def num_range(a, b):
return range(a, b)
In [16]:
wf = sum(map(sqr, num_range(0, 1000)))
In [17]:
draw_workflow('callgraph3.png', wf._workflow)
In [18]:
run_parallel(wf, n_threads=4)
Out[18]:
In [19]:
@schedule
class A:
def __init__(self, value):
self.value = value
def multiply(self, factor):
self.value *= factor
In [20]:
a = A(5)
a.multiply(10)
a.second = 7
In [21]:
draw_workflow("callgraph4.png", a._workflow)
In [22]:
@schedule
class A:
def __init__(self, value):
self.value = value
def multiply(self, factor):
self.value *= factor
return self
In [23]:
a = A(5)
a = a.multiply(10)
a.second = 7
In [24]:
draw_workflow("callgraph5.png", a._workflow)
In [25]:
result = run_parallel(a, n_threads=4)
print(result.value, result.second)